package com.ybg.framework.core.zookeeper;

import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class ZkDistributedLock extends ZkDistributedLockParent implements Lock {
    private final static Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);

    private static Map<ZkClient, Map<String, LockKeyChangeListener>> zkDataListener = new HashMap<ZkClient, Map<String, LockKeyChangeListener>>();

    private ReentrantLock localLock = new ReentrantLock(true);
    private Condition localCondition = localLock.newCondition();
    private LockKeyChangeListener dataListener;
    private String lockKey;
    private Object initLock = new Object();
    private String localAddress;
    private String processName;
    private String lockPath;

    public ZkDistributedLock(IZkLockConfig config, String lockKey) {
        super(config);
        this.lockKey = lockKey;
        Preconditions.checkArgument(!Strings.isNullOrEmpty(lockKey), "ZkDistributedLock 需指定lockKey");
        Preconditions.checkArgument(lockKey.indexOf("/") == -1, "lockKey不可包函/");
        initLockPath();
        if (zkDataListener.get(super.getZkClient()) == null
                || zkDataListener.get(super.getZkClient()).get(lockKey) == null) {
            synchronized (initLock) {
                String basePath = super.getConfig().getBasePath().replaceFirst("(.*)?/$", "$1");
                if (!super.getZkClient().exists(basePath)) {
                    LOG.debug("分布式锁主路径不存在,准备创建!{}", basePath);
                    super.getZkClient().createPersistent(basePath, "true");
                }
                LOG.debug("分布式锁主路径不存在,已创建!{}", super.getConfig().getBasePath());
                if (zkDataListener.get(super.getZkClient()) == null) {
                    zkDataListener.put(super.getZkClient(), new HashMap<String, LockKeyChangeListener>());
                }
                if (zkDataListener.get(super.getZkClient()).get(lockKey) == null) {
                    this.dataListener = new LockKeyChangeListener(this);
                    zkDataListener.get(super.getZkClient()).put(lockKey, this.dataListener);
                    super.getZkClient().subscribeDataChanges(lockPath, this.dataListener);
                    LOG.debug("初始化锁路径监听,localPath:{}", this.lockPath);
                }
            }
        }
        try {
            localAddress = InetAddress.getLocalHost().getHostAddress();
            processName = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
        } catch (UnknownHostException e) {
            localAddress = ManagementFactory.getRuntimeMXBean().getName().split("@")[1];
        }
        this.dataListener = (LockKeyChangeListener) zkDataListener.get(super.getZkClient()).get(lockKey);
        dataListener.addDistributedLock(this);
    }
    protected boolean createRemoteLock() {
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        boolean res = false;
        try {
            super.getZkClient().create(lockPath, lockData + "@1", CreateMode.EPHEMERAL);
            LOG.debug("{}-{},获取锁成功!", new Object[] { lockData, lockPath });
            res = true;
        } catch (Exception ex) {
            String remoteLockData = super.getZkClient().readData(lockPath, true);
            if (remoteLockData != null && remoteLockData.startsWith(lockData)) {
                int count = Integer.parseInt(remoteLockData.split("@")[3]) + 1;
                super.getZkClient().writeData(lockPath, lockData + "@" + count);
                LOG.debug("{}-{},获取锁成功,获取次数:{}!", new Object[] { lockData, lockPath, count });
                res = true;
            }
        }
        return res;
    }
    protected void removeRemoteLock() {
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        String remoteLockData = super.getZkClient().readData(lockPath, true);
        if (remoteLockData != null && remoteLockData.startsWith(lockData)) {
            super.getZkClient().delete(lockPath);
            LOG.debug("{}-{},远程锁释放成功!", new Object[] { lockData, lockPath });
        }
        localLock.unlock();
    }
    @Override
    public void lock() {
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        for (;;) {
            LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
            localLock.lock();
            boolean res = createRemoteLock();
            if(res){
                break;
            }
            try {
                LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
                long waitFlag = localCondition.awaitNanos(TimeUnit.MILLISECONDS.toNanos(super.getConfig().getRetryMaxTime() == null ? 300 : super.getConfig().getRetryMaxTime()));
                if(waitFlag>0){
                    LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
                }else{
                    LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
                }
            } catch (InterruptedException e) {
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        for (;;) {
            LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
            localLock.lockInterruptibly();
            boolean res = createRemoteLock();
            if(res){
                break;
            }
            LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
            long waitFlag = localCondition.awaitNanos(TimeUnit.MILLISECONDS.toNanos(super.getConfig().getRetryMaxTime() == null ? 300 : super.getConfig().getRetryMaxTime()));
            if(waitFlag>0){
                LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
            }else{
                LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
            }
        }
    }

    @Override
    public boolean tryLock() {
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
        boolean res = localLock.tryLock();
        if (res) {
            res = createRemoteLock();
        }
        if(!res){
            LOG.debug("{}-{},获取锁失败!", new Object[] { lockData, lockPath });
        }
        return res;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        long lastTime = System.nanoTime();
        long now, offsetTime;
        boolean res = false;
        String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
        for (;;) {
            now = System.nanoTime();
            offsetTime = now - lastTime;
            if (unit.toNanos(time) - offsetTime > 0) {
                LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
                res = localLock.tryLock(unit.toNanos(time) - offsetTime, unit);
                if (res) {
                    res = createRemoteLock();
                    if(res){
                        break;
                    }
                    now = System.nanoTime();
                    offsetTime = unit.convert(now - lastTime, TimeUnit.NANOSECONDS);
                    if (time - offsetTime > 0) {
                        LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
                        long waitFlag = localCondition.awaitNanos(unit.toNanos(time - offsetTime));
                        if(waitFlag>0){
                            LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
                        }else{
                            LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
                        }
                    }else{
                        break;
                    }
                } else {
                    break;
                }
            } else {
                break;
            }
        }
        if(!res){
            LOG.debug("{}-{},获取锁失败!", new Object[] { lockData, lockPath });
        }
        return res;
    }

    @Override
    public void unlock() {
        localLock.lock();
        try {
            removeRemoteLock();
        } finally {
            localLock.unlock();
        }
    }
    /**
     *
     * 该方法返回的Condition中的方法，当前仅对本地线程有效.
     * await 仅释放本地锁，远程锁将继续保持直到调用unlock或应用中止
     * @return
     * @see java.util.concurrent.locks.Lock#newCondition()
     */
    @Override
    public Condition newCondition() {
        return localLock.newCondition();
    }

    public void destory() {
        LOG.debug("销毁锁,{}!", lockPath);
        this.dataListener.reomveDistributedLock(this);
        super.destory();
    }

    private void initLockPath() {
        if (super.getConfig().getBasePath() == null) {
            lockPath = "/";
        } else if (super.getConfig().getBasePath().endsWith("/")) {
            lockPath = super.getConfig().getBasePath();
        } else {
            lockPath = super.getConfig().getBasePath() + "/";
        }
        lockPath = lockPath + lockKey;
    }

    protected ReentrantLock getLocalLock() {
        return localLock;
    }

    protected Condition getLocalCondition() {
        return localCondition;
    }

    private class LockKeyChangeListener implements IZkDataListener {
        private Set<ZkDistributedLock> lockSet = new HashSet<ZkDistributedLock>();

        public void addDistributedLock(ZkDistributedLock lock) {
            lockSet.add(lock);
        }

        public void reomveDistributedLock(ZkDistributedLock lock) {
            lockSet.remove(lock);
        }

        public LockKeyChangeListener(ZkDistributedLock lock) {
            this.addDistributedLock(lock);
        }

        @Override
        public void handleDataChange(String dataPath, Object data) throws Exception {
            LOG.debug("远程锁发生变更,{}-{}!", new Object[] { lockPath, data });
        }

        @Override
        public void handleDataDeleted(String dataPath) throws Exception {
            LOG.debug("远程锁被释放,{}!", lockPath);
            for (ZkDistributedLock lock : lockSet) {
                lock.getLocalLock().lock();
                try {
                    LOG.debug("通知本地线程获取锁,{}!", lockPath);
                    lock.getLocalCondition().signalAll();
                } finally {
                    lock.getLocalLock().unlock();
                }
            }
        }

    }
}
